[FLINK-39436][table] Allow late data in PTFs#27935
Conversation
gustavodemorais
left a comment
There was a problem hiding this comment.
The PR looks good in general to me!
Two nits: can you rename the PR title and commit title to use this format [FLINK-39436][table]?
| "+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark null}, 1970-01-01T00:00:00.001Z]", | ||
| "+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:00:00.002Z] at time 2 watermark null}, 1970-01-01T00:00:00.002Z]") | ||
| .consumedAfterRestore( | ||
| "+I[Bob, {Timer timeout1 fired at time 1 watermark 9223372036854775807}, 1970-01-01T00:00:00.001Z]", |
There was a problem hiding this comment.
One thing that was a bit confusing for me is why we have watermark 9223372036854775807 here? Do we emit this max watermark only for testing or do our sources actually emit this when all values have been read? I wondered if that basically makes the user not know if something is late anymore since
There was a problem hiding this comment.
Sources emit this when all values have been read. It will flush all remaining timers and windows and mark the job as complete.
There was a problem hiding this comment.
Makes sense. This is an unrelated topic to the PR so definitely not a blocker: but I guess it makes sense for users to be aware of this so they can write their logic accordingly as well? Maybe it's already documented somewhere and then there's nothing to do 🙂
There was a problem hiding this comment.
And for this PR: maybe add a short comment Sources emit 9223372036854775807 as a MAX watermark after all values have been read. It makes understanding the test a bit easier
There was a problem hiding this comment.
AFAICT this is a property of the testing framework.
I can see that this looks surprising, but I'm not sure if it makes sense to put a comment everywhere the framework is being used.
|
One thing worth noting: removing the watermark guard means past-time timers registered from onTimer() fire immediately (within the same advanceWatermark loop). But it also means something like this lead to infinite loops: AI says this is the same behavior as standard Flink KeyedProcessFunction, so the design is consistent - just wanted to flag that PTF users were previously protected from this and they may shoot themselves in the foot more easily now. Maybe worth a short note in the timers section of ptfs.md along the lines of "timers registered at or below the current watermark fire immediately at the next watermark advance; avoid unconditional re-registration of past-time timers from onTimer() as this will cause an infinite loop." |
twalthr
left a comment
There was a problem hiding this comment.
Thanks for the PR @fhueske. I added some minor comments. Could we update the JavaDoc in ProcessTableFunction and the documentation for late events. Late events deserve a dedicated section, before * <h2>Efficiency and Design Principles</h2>.
| // Register at wm+1 to always target the immediate next watermark: the timer fires | ||
| // exactly once per watermark advance, and each new row re-registers the timer for the | ||
| // following watermark step, demonstrating repeated timer re-registration. | ||
| long timer = wm == null || wm < 0 ? 1 : wm + 1; |
There was a problem hiding this comment.
wouldn't this be more correct and allowing for watermarks before epoch.
| long timer = wm == null || wm < 0 ? 1 : wm + 1; | |
| long timer = wm == null ? Long.MIN_VALUE + 1: wm + 1; |
There was a problem hiding this comment.
This is just a function for testing purposes and we assert against the behavior that we implement.
So I think it doesn't really matter.
Or is there an (edge) case that would be covered with your suggestion that isn't with the current version?
857e77c to
a08f393
Compare
|
Thanks for your reviews @gustavodemorais and @twalthr. |
gustavodemorais
left a comment
There was a problem hiding this comment.
The PR looks good to me! Thanks, @fhueske
- I think you have to run spotless to make the CI green
a08f393 to
57b157d
Compare
57b157d to
1781123
Compare
| Registering a timer for a time that is less than or equal to the current watermark is allowed. | ||
| If registered from within `eval()`, the timer fires on the next watermark advance. If registered | ||
| from within `onTimer()`, the timer fires immediately after the current timer finishes. Note that | ||
| unconditionally re-registering a past-time timer from within `onTimer()` causes an infinite loop. |
There was a problem hiding this comment.
Explicitly warning in Javadoc that 'unconditionally re-registering a past-time timer from within onTimer() causes an infinite loop';
Is it necessary to consider adding protection, such as adding detection logic.
Alternatively, it may be necessary to add this infinite loop error test in the test, such as adding the timer for 'alice-again' in LateTimersFunction # eval
There was a problem hiding this comment.
This is the same behavior as for other functions with access to timers such as KeyedProcessFunction.
Before this change, we ignored the registrations of timers with a timestamp < current_watermark, which is inconsistent with Flink's other functions and surprising for Flink users.
We might be able to introduce guards against such behavior, but I don't think it is necessary. I don't think this is a common problem and adding logic to detect and prevent such logic would cause more problems than it solves.
| @@ -1115,35 +1116,61 @@ public class ProcessTableFunctionTestPrograms { | |||
| public static final TableTestProgram PROCESS_LATE_EVENTS = | |||
There was a problem hiding this comment.
Is it possible to add PROCESS_LATE_EVENTS_RESTORE.
1781123 to
e63b9b3
Compare
|
Thanks for the review @Au-Miner. Best, Fabian |
Previously, late events (rowtime <= watermark) were silently dropped before reaching PTF eval(), and timer registrations for times <= watermark were also silently dropped. This change removes both restrictions: - ProcessTableRunner: remove the early-return guard in processEval() so that late events are passed to the PTF's eval() method. - WritableInternalTimeContext: remove the watermark check in registerOnTimeInternal() so that timers can be registered for past times. Such timers fire immediately at the next watermark advance, including when registered from within onTimer(). The previous guard also had an unintended side effect: any call to replaceNamedTimer() with a past time would delete the existing timer entry but then silently drop the new registration, leaving the named timer in a state where it appeared un-registered but the old timer was gone. - AbstractProcessTableOperator: remove the fired named timer's state entry before invoking onTimer() to prevent stale entries from accumulating in the named timers map state. Tests are updated to reflect the new semantics: - PROCESS_LATE_EVENTS: demonstrates that late events enter eval(), can register timers (including for past times), and that such timers fire immediately at the next watermark advance. - PROCESS_ROW_LATE_EVENTS (new): verifies the same for row-semantics PTFs. - PROCESS_OPTIONAL_ON_TIME / PROCESS_NAMED_TIMERS: updated to reflect that timer registrations for past times are no longer dropped. Previously, once the watermark passed the registered time, the timer was silently discarded; now it fires immediately.
e63b9b3 to
521b017
Compare
Previously, late events (rowtime <= watermark) were silently dropped before reaching PTF eval(), and timer registrations for times <= watermark were also silently dropped. This change removes both restrictions: - ProcessTableRunner: remove the early-return guard in processEval() so that late events are passed to the PTF's eval() method. - WritableInternalTimeContext: remove the watermark check in registerOnTimeInternal() so that timers can be registered for past times. Such timers fire immediately at the next watermark advance, including when registered from within onTimer(). The previous guard also had an unintended side effect: any call to replaceNamedTimer() with a past time would delete the existing timer entry but then silently drop the new registration, leaving the named timer in a state where it appeared un-registered but the old timer was gone. - AbstractProcessTableOperator: remove the fired named timer's state entry before invoking onTimer() to prevent stale entries from accumulating in the named timers map state. Tests are updated to reflect the new semantics: - PROCESS_LATE_EVENTS: demonstrates that late events enter eval(), can register timers (including for past times), and that such timers fire immediately at the next watermark advance. - PROCESS_ROW_LATE_EVENTS (new): verifies the same for row-semantics PTFs. - PROCESS_OPTIONAL_ON_TIME / PROCESS_NAMED_TIMERS: updated to reflect that timer registrations for past times are no longer dropped. Previously, once the watermark passed the registered time, the timer was silently discarded; now it fires immediately.
What is the purpose of the change
Previously, late events (rowtime <= watermark) were silently dropped before reaching PTF eval(), and timer registrations for times <= watermark were also silently dropped. This change removes both restrictions.
These are breaking changes that are part of FLIP-565 which was approved.
I checked that the docs don't need to be adjusted because the earlier behavior of PTFs dropping late data was not documented. The new behavior is aligned with other functions in Flink.
Brief change log
ProcessTableRunner: remove the early-return guard in processEval() so that late events are passed to the PTF's eval() method.
WritableInternalTimeContext: remove the watermark check in registerOnTimeInternal() so that timers can be registered for past times. Such timers fire immediately at the next watermark advance, including when registered from within onTimer(). The previous guard also had an unintended side effect: any call to replaceNamedTimer() with a past time would delete the existing timer entry but then silently drop the new registration, leaving the named timer in a state where it appeared un-registered but the old timer was gone.
AbstractProcessTableOperator: remove the fired named timer's state entry before invoking onTimer() to prevent stale entries from accumulating in the named timers map state.
Verifying this change
Tests are updated to reflect the new semantics:
PROCESS_LATE_EVENTS: demonstrates that late events enter eval(), can register timers (including for past times), and that such timers fire immediately at the next watermark advance.
PROCESS_ROW_LATE_EVENTS (new): verifies the same for row-semantics PTFs.
PROCESS_OPTIONAL_ON_TIME / PROCESS_NAMED_TIMERS: updated to reflect that timer registrations for past times are no longer dropped. Previously, once the watermark passed the registered time, the timer was silently discarded; now it fires immediately.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation